import com.tplhk.thread.completeableFuture.service.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;

/**
 * @ClassName : FutureLesson1
 * @Description : 一个例子走进CompletableFuture
 * @Author : jqxu
 * @Date: 2022/4/27 9:11
 **/
@Slf4j
@SpringBootTest(classes = {FutureLesson2.class})
public class FutureLesson2 {

    /**
     * allof 的使用
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void allof() throws InterruptedException, TimeoutException, ExecutionException {
        long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture，join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> {
                            log.info("{}，{}", integer, Thread.currentThread().getName());
                            return calc(integer);
                        }, executorService)
                                .thenApplyAsync(h -> {
                                    log.info(":::{}，{}", h, Thread.currentThread().getName());
                                    if (new Random().nextInt(10) % 2 == 0) {
                                        log.info("{}抛了个运算异常", h);
                                        int i = 12 / 0;
                                    }
                                    return Integer.toString(h);

                                })
                                .handle((s, e) -> {
                                    if (null != e) {
                                        System.out.println("异常了！！！！" + LocalDateTime.now());
                                        return null;
                                    }
                                    System.out.println("任务" + s + "完成!result=" + s + LocalDateTime.now());
                                    // list 也可以保存结果
                                    list.add(s);
                                    //  结果保存到 future 中
                                    return s;
                                })
                ).toArray(CompletableFuture[]::new);

        //阻塞等待所有任务完成. 使用 get ，调用方处理异常，就不会导致调用方线程永久阻塞
        CompletableFuture.allOf(cfs).get();

        List<String> collect = Arrays.stream(cfs).filter(item -> null != item.join()).map(item -> item.join().toString()).collect(Collectors.toList());
        System.out.println(LocalDateTime.now() + ",collect=" + collect + ",耗时=" + (System.currentTimeMillis() - start));
        System.out.println(LocalDateTime.now() + ", list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public int calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(5000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(5000);//其它任务耗时1秒
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }


    /**
     * complex 的使用
     * 1. 在线程中取得cars列表
     * 2. 遍历cars ,每个car  在独立线程中调用 rating(carName) 得到一个评分
     * 3. 所有 cars 评分得到后，打印car信息和评分
     *
     * @throws InterruptedException
     * @throws TimeoutException
     * @throws ExecutionException
     */
    @Test
    public void complex() throws InterruptedException, TimeoutException, ExecutionException {
        long start = System.currentTimeMillis();
        ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
        log.info("{} ", Thread.currentThread().getName());

        CompletableFuture.supplyAsync(() -> getCarInfo(), taskExecutor).thenCompose((v) -> {
            log.info("{} ", Thread.currentThread().getName());
            List<CompletableFuture<CarInfo>> completableFutureList = v.stream().map(car -> rating(car.getCarName(), car.getId())
                    .thenApply((score) -> {
                        car.setScore(score);
                        return car;
                    })).collect(Collectors.toList());

            return CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]))
                    .thenApply(all -> completableFutureList.stream()
                            .map(item -> item.handle((carinfo, e) -> {
                                if (null != e) {
                                    log.info("{} error!!!", carinfo.getCarName());
                                    return null;
                                }
                                return carinfo;
                            }))
                            .map(item -> item.join())
                            .filter(Objects::nonNull).collect(Collectors.toList()));
        }).whenComplete((list, e) -> {
            list.stream().forEach(item -> {
                log.info(item.toString());
            });
        });


        Thread.sleep(30 * 1000);
    }


    public List<CarInfo> getCarInfo() {
        log.info("{} ", Thread.currentThread().getName());
        List<CarInfo> list = new ArrayList<>();
        for (int i = 0; i < 5; i++) {
            CarInfo carInfo = new CarInfo();
            carInfo.setId(i);
            carInfo.setCarName("car" + i);
            list.add(carInfo);
        }
        return list;
    }


    private CompletableFuture<Integer> rating(String carName, int i) {
        log.info("{},{} ", i, Thread.currentThread().getName());

        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(i * 5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Random().nextInt(100);
        });
    }

}
